1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx;
17
18 import static org.junit.Assert.*;
19
20 import java.util.List;
21 import java.util.concurrent.*;
22 import java.util.concurrent.atomic.*;
23
24 import org.junit.*;
25
26 import org.junit.rules.TestName;
27 import rx.Observable.OnSubscribe;
28 import rx.exceptions.MissingBackpressureException;
29 import rx.functions.*;
30 import rx.internal.util.RxRingBuffer;
31 import rx.observers.TestSubscriber;
32 import rx.schedulers.Schedulers;
33 import rx.test.TestObstructionDetection;
34
35 public class BackpressureTests {
36
37 @Rule
38 public TestName testName = new TestName();
39
40 @After
41 public void doAfterTest() {
42 TestObstructionDetection.checkObstruction();
43 }
44
45 @Test
46 public void testObserveOn() {
47 int NUM = (int) (RxRingBuffer.SIZE * 2.1);
48 AtomicInteger c = new AtomicInteger();
49 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
50 incrementingIntegers(c).observeOn(Schedulers.computation()).take(NUM).subscribe(ts);
51 ts.awaitTerminalEvent();
52 ts.assertNoErrors();
53 System.out.println("testObserveOn => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c.get());
54 assertEquals(NUM, ts.getOnNextEvents().size());
55 assertTrue(c.get() < RxRingBuffer.SIZE * 4);
56 }
57
58 @Test
59 public void testObserveOnWithSlowConsumer() {
60 int NUM = (int) (RxRingBuffer.SIZE * 0.2);
61 AtomicInteger c = new AtomicInteger();
62 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
63 incrementingIntegers(c).observeOn(Schedulers.computation()).map(new Func1<Integer, Integer>() {
64
65 @Override
66 public Integer call(Integer i) {
67 try {
68 Thread.sleep(1);
69 } catch (InterruptedException e) {
70 e.printStackTrace();
71 }
72 return i;
73 }
74
75 }).take(NUM).subscribe(ts);
76 ts.awaitTerminalEvent();
77 ts.assertNoErrors();
78 System.out.println("testObserveOnWithSlowConsumer => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c.get());
79 assertEquals(NUM, ts.getOnNextEvents().size());
80 assertTrue(c.get() < RxRingBuffer.SIZE * 2);
81 }
82
83 @Test
84 public void testMergeSync() {
85 int NUM = (int) (RxRingBuffer.SIZE * 4.1);
86 AtomicInteger c1 = new AtomicInteger();
87 AtomicInteger c2 = new AtomicInteger();
88 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
89 Observable<Integer> merged = Observable.merge(incrementingIntegers(c1), incrementingIntegers(c2));
90
91 merged.take(NUM).subscribe(ts);
92 ts.awaitTerminalEvent();
93 ts.assertNoErrors();
94 System.out.println("Expected: " + NUM + " got: " + ts.getOnNextEvents().size());
95 System.out.println("testMergeSync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get());
96 assertEquals(NUM, ts.getOnNextEvents().size());
97
98
99
100 assertTrue(c1.get() < RxRingBuffer.SIZE * 5);
101 assertTrue(c2.get() < RxRingBuffer.SIZE * 5);
102 }
103
104 @Test
105 public void testMergeAsync() {
106 int NUM = (int) (RxRingBuffer.SIZE * 4.1);
107 AtomicInteger c1 = new AtomicInteger();
108 AtomicInteger c2 = new AtomicInteger();
109 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
110 Observable<Integer> merged = Observable.merge(
111 incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
112 incrementingIntegers(c2).subscribeOn(Schedulers.computation()));
113
114 merged.take(NUM).subscribe(ts);
115 ts.awaitTerminalEvent();
116 ts.assertNoErrors();
117 System.out.println("testMergeAsync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get());
118 assertEquals(NUM, ts.getOnNextEvents().size());
119
120
121
122 assertTrue(c1.get() < RxRingBuffer.SIZE * 5);
123 assertTrue(c2.get() < RxRingBuffer.SIZE * 5);
124 }
125
126 @Test
127 public void testMergeAsyncThenObserveOn() {
128 int NUM = (int) (RxRingBuffer.SIZE * 4.1);
129 AtomicInteger c1 = new AtomicInteger();
130 AtomicInteger c2 = new AtomicInteger();
131 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
132 Observable<Integer> merged = Observable.merge(
133 incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
134 incrementingIntegers(c2).subscribeOn(Schedulers.computation()));
135
136 merged.observeOn(Schedulers.newThread()).take(NUM).subscribe(ts);
137 ts.awaitTerminalEvent();
138 ts.assertNoErrors();
139 System.out.println("testMergeAsyncThenObserveOn => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get());
140 assertEquals(NUM, ts.getOnNextEvents().size());
141
142
143
144
145 assertTrue(c1.get() < RxRingBuffer.SIZE * 7);
146 assertTrue(c2.get() < RxRingBuffer.SIZE * 7);
147 }
148
149 @Test
150 public void testFlatMapSync() {
151 int NUM = (int) (RxRingBuffer.SIZE * 2.1);
152 AtomicInteger c = new AtomicInteger();
153 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
154 incrementingIntegers(c).flatMap(new Func1<Integer, Observable<Integer>>() {
155
156 @Override
157 public Observable<Integer> call(Integer i) {
158 return incrementingIntegers(new AtomicInteger()).take(10);
159 }
160
161 }).take(NUM).subscribe(ts);
162 ts.awaitTerminalEvent();
163 ts.assertNoErrors();
164 System.out.println("testFlatMapSync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c.get());
165 assertEquals(NUM, ts.getOnNextEvents().size());
166
167 assertTrue(c.get() < RxRingBuffer.SIZE);
168 }
169
170 @Test
171 @Ignore
172 public void testFlatMapAsync() {
173 int NUM = (int) (RxRingBuffer.SIZE * 2.1);
174 AtomicInteger c = new AtomicInteger();
175 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
176 incrementingIntegers(c).subscribeOn(Schedulers.computation()).flatMap(new Func1<Integer, Observable<Integer>>() {
177
178 @Override
179 public Observable<Integer> call(Integer i) {
180 return incrementingIntegers(new AtomicInteger()).take(10).subscribeOn(Schedulers.computation());
181 }
182
183 }).take(NUM).subscribe(ts);
184 ts.awaitTerminalEvent();
185 ts.assertNoErrors();
186 System.out.println("testFlatMapAsync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c.get() + " Size: " + RxRingBuffer.SIZE);
187 assertEquals(NUM, ts.getOnNextEvents().size());
188
189
190
191 assertTrue(c.get() <= RxRingBuffer.SIZE * 2);
192 }
193
194 @Test
195 public void testZipSync() {
196 int NUM = (int) (RxRingBuffer.SIZE * 4.1);
197 AtomicInteger c1 = new AtomicInteger();
198 AtomicInteger c2 = new AtomicInteger();
199 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
200 Observable<Integer> zipped = Observable.zip(
201 incrementingIntegers(c1),
202 incrementingIntegers(c2),
203 new Func2<Integer, Integer, Integer>() {
204
205 @Override
206 public Integer call(Integer t1, Integer t2) {
207 return t1 + t2;
208 }
209
210 });
211
212 zipped.take(NUM).subscribe(ts);
213 ts.awaitTerminalEvent();
214 ts.assertNoErrors();
215 System.out.println("testZipSync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get());
216 assertEquals(NUM, ts.getOnNextEvents().size());
217 assertTrue(c1.get() < RxRingBuffer.SIZE * 5);
218 assertTrue(c2.get() < RxRingBuffer.SIZE * 5);
219 }
220
221 @Test
222 public void testZipAsync() {
223 int NUM = (int) (RxRingBuffer.SIZE * 2.1);
224 AtomicInteger c1 = new AtomicInteger();
225 AtomicInteger c2 = new AtomicInteger();
226 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
227 Observable<Integer> zipped = Observable.zip(
228 incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
229 incrementingIntegers(c2).subscribeOn(Schedulers.computation()),
230 new Func2<Integer, Integer, Integer>() {
231
232 @Override
233 public Integer call(Integer t1, Integer t2) {
234 return t1 + t2;
235 }
236
237 });
238
239 zipped.take(NUM).subscribe(ts);
240 ts.awaitTerminalEvent();
241 ts.assertNoErrors();
242 System.out.println("testZipAsync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get());
243 assertEquals(NUM, ts.getOnNextEvents().size());
244 assertTrue(c1.get() < RxRingBuffer.SIZE * 3);
245 assertTrue(c2.get() < RxRingBuffer.SIZE * 3);
246 }
247
248 @Test
249 public void testSubscribeOnScheduling() {
250
251 for (int i = 0; i < 100; i++) {
252 int NUM = (int) (RxRingBuffer.SIZE * 2.1);
253 AtomicInteger c = new AtomicInteger();
254 ConcurrentLinkedQueue<Thread> threads = new ConcurrentLinkedQueue<Thread>();
255 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
256
257 incrementingIntegers(c, threads).subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).take(NUM).subscribe(ts);
258 ts.awaitTerminalEvent();
259 ts.assertNoErrors();
260 System.out.println("testSubscribeOnScheduling => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c.get());
261 assertEquals(NUM, ts.getOnNextEvents().size());
262 assertTrue(c.get() < RxRingBuffer.SIZE * 4);
263 Thread first = null;
264 for (Thread t : threads) {
265 System.out.println("testSubscribeOnScheduling => thread: " + t);
266 if (first == null) {
267 first = t;
268 } else {
269 if (!first.equals(t)) {
270 fail("Expected to see the same thread");
271 }
272 }
273 }
274 System.out.println("testSubscribeOnScheduling => Number of batch requests seen: " + threads.size());
275 assertTrue(threads.size() > 1);
276 System.out.println("-------------------------------------------------------------------------------------------");
277 }
278 }
279
280 @Test
281 public void testTakeFilterSkipChainAsync() {
282 int NUM = (int) (RxRingBuffer.SIZE * 2.1);
283 AtomicInteger c = new AtomicInteger();
284 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
285 incrementingIntegers(c).observeOn(Schedulers.computation())
286 .skip(10000)
287 .filter(new Func1<Integer, Boolean>() {
288
289 @Override
290 public Boolean call(Integer i) {
291 return i > 11000;
292 }
293
294 }).take(NUM).subscribe(ts);
295
296 ts.awaitTerminalEvent();
297 ts.assertNoErrors();
298
299
300
301
302
303 int expected = 10000 + 1000 + RxRingBuffer.SIZE * 3 + RxRingBuffer.SIZE / 2;
304
305 System.out.println("testTakeFilterSkipChain => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c.get() + " Expected: " + expected);
306 assertEquals(NUM, ts.getOnNextEvents().size());
307 assertTrue(c.get() < expected);
308 }
309
310 @Test
311 public void testUserSubscriberUsingRequestSync() {
312 AtomicInteger c = new AtomicInteger();
313 final AtomicInteger totalReceived = new AtomicInteger();
314 final AtomicInteger batches = new AtomicInteger();
315 final AtomicInteger received = new AtomicInteger();
316 incrementingIntegers(c).subscribe(new Subscriber<Integer>() {
317
318 @Override
319 public void onStart() {
320 request(100);
321 }
322
323 @Override
324 public void onCompleted() {
325
326 }
327
328 @Override
329 public void onError(Throwable e) {
330
331 }
332
333 @Override
334 public void onNext(Integer t) {
335 int total = totalReceived.incrementAndGet();
336 received.incrementAndGet();
337 if (total >= 2000) {
338 unsubscribe();
339 }
340 if (received.get() == 100) {
341 batches.incrementAndGet();
342 request(100);
343 received.set(0);
344 }
345 }
346
347 });
348
349 System.out.println("testUserSubscriberUsingRequestSync => Received: " + totalReceived.get() + " Emitted: " + c.get() + " Request Batches: " + batches.get());
350 assertEquals(2000, c.get());
351 assertEquals(2000, totalReceived.get());
352 assertEquals(20, batches.get());
353 }
354
355 @Test
356 public void testUserSubscriberUsingRequestAsync() throws InterruptedException {
357 AtomicInteger c = new AtomicInteger();
358 final AtomicInteger totalReceived = new AtomicInteger();
359 final AtomicInteger received = new AtomicInteger();
360 final AtomicInteger batches = new AtomicInteger();
361 final CountDownLatch latch = new CountDownLatch(1);
362 incrementingIntegers(c).subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
363
364 @Override
365 public void onStart() {
366 request(100);
367 }
368
369 @Override
370 public void onCompleted() {
371 latch.countDown();
372 }
373
374 @Override
375 public void onError(Throwable e) {
376 latch.countDown();
377 }
378
379 @Override
380 public void onNext(Integer t) {
381 int total = totalReceived.incrementAndGet();
382 received.incrementAndGet();
383 boolean done = false;
384 if (total >= 2000) {
385 done = true;
386 unsubscribe();
387 }
388 if (received.get() == 100) {
389 batches.incrementAndGet();
390 received.set(0);
391 if (!done) {
392 request(100);
393 }
394 }
395 if (done) {
396 latch.countDown();
397 }
398 }
399
400 });
401
402 latch.await();
403 System.out.println("testUserSubscriberUsingRequestAsync => Received: " + totalReceived.get() + " Emitted: " + c.get() + " Request Batches: " + batches.get());
404 assertEquals(2000, c.get());
405 assertEquals(2000, totalReceived.get());
406 assertEquals(20, batches.get());
407 }
408
409 @Test(timeout = 2000)
410 public void testFirehoseFailsAsExpected() {
411 AtomicInteger c = new AtomicInteger();
412 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
413 firehose(c).observeOn(Schedulers.computation()).map(SLOW_PASS_THRU).subscribe(ts);
414 ts.awaitTerminalEvent();
415 System.out.println("testFirehoseFailsAsExpected => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c.get());
416 assertEquals(1, ts.getOnErrorEvents().size());
417 assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
418 }
419
420 @Test(timeout = 10000)
421 public void testOnBackpressureDrop() {
422 for (int i = 0; i < 100; i++) {
423 int NUM = (int) (RxRingBuffer.SIZE * 1.1);
424 AtomicInteger c = new AtomicInteger();
425 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
426 firehose(c).onBackpressureDrop()
427 .observeOn(Schedulers.computation())
428 .map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
429 ts.awaitTerminalEvent();
430 ts.assertNoErrors();
431
432 List<Integer> onNextEvents = ts.getOnNextEvents();
433 assertEquals(NUM, onNextEvents.size());
434
435 Integer lastEvent = onNextEvents.get(NUM - 1);
436
437 System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent);
438
439 assertTrue(NUM - 1 <= lastEvent.intValue());
440 }
441 }
442
443 @Test(timeout = 10000)
444 public void testOnBackpressureDropWithAction() {
445 for (int i = 0; i < 100; i++) {
446 final AtomicInteger emitCount = new AtomicInteger();
447 final AtomicInteger dropCount = new AtomicInteger();
448 final AtomicInteger passCount = new AtomicInteger();
449 final int NUM = RxRingBuffer.SIZE * 3;
450 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
451 firehose(emitCount).onBackpressureDrop(new Action1<Integer>() {
452 @Override
453 public void call(Integer i) {
454 dropCount.incrementAndGet();
455 }
456 })
457 .doOnNext(new Action1<Integer>() {
458 @Override
459 public void call(Integer integer) {
460 passCount.incrementAndGet();
461 }
462 })
463 .observeOn(Schedulers.computation())
464 .map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
465 ts.awaitTerminalEvent();
466 ts.assertNoErrors();
467
468 List<Integer> onNextEvents = ts.getOnNextEvents();
469 Integer lastEvent = onNextEvents.get(NUM - 1);
470 System.out.println(testName.getMethodName() + " => Received: " + onNextEvents.size() + " Passed: " + passCount.get() + " Dropped: " + dropCount.get() + " Emitted: " + emitCount.get() + " Last value: " + lastEvent);
471 assertEquals(NUM, onNextEvents.size());
472
473 assertTrue(NUM <= passCount.get());
474
475 assertTrue(NUM - 1 <= lastEvent.intValue());
476 assertTrue(0 < dropCount.get());
477 assertEquals(emitCount.get(), passCount.get() + dropCount.get());
478 }
479 }
480
481 @Test(timeout = 10000)
482 public void testOnBackpressureDropSynchronous() {
483 for (int i = 0; i < 100; i++) {
484 int NUM = (int) (RxRingBuffer.SIZE * 1.1);
485 AtomicInteger c = new AtomicInteger();
486 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
487 firehose(c).onBackpressureDrop()
488 .map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
489 ts.awaitTerminalEvent();
490 ts.assertNoErrors();
491
492 List<Integer> onNextEvents = ts.getOnNextEvents();
493 assertEquals(NUM, onNextEvents.size());
494
495 Integer lastEvent = onNextEvents.get(NUM - 1);
496
497 System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent);
498
499 assertTrue(NUM - 1 <= lastEvent.intValue());
500 }
501 }
502
503 @Test(timeout = 10000)
504 public void testOnBackpressureDropSynchronousWithAction() {
505 for (int i = 0; i < 100; i++) {
506 final AtomicInteger dropCount = new AtomicInteger();
507 int NUM = (int) (RxRingBuffer.SIZE * 1.1);
508 AtomicInteger c = new AtomicInteger();
509 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
510 firehose(c).onBackpressureDrop(new Action1<Integer>() {
511 @Override
512 public void call(Integer i) {
513 dropCount.incrementAndGet();
514 }
515 })
516 .map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
517 ts.awaitTerminalEvent();
518 ts.assertNoErrors();
519
520 List<Integer> onNextEvents = ts.getOnNextEvents();
521 assertEquals(NUM, onNextEvents.size());
522
523 Integer lastEvent = onNextEvents.get(NUM - 1);
524
525 System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Dropped: " + dropCount.get() + " Emitted: " + c.get() + " Last value: " + lastEvent);
526
527 assertTrue(NUM - 1 <= lastEvent.intValue());
528
529 assertEquals(0, dropCount.get());
530 assertEquals(c.get(), onNextEvents.size());
531 }
532 }
533
534 @Test(timeout = 2000)
535 public void testOnBackpressureBuffer() {
536 int NUM = (int) (RxRingBuffer.SIZE * 1.1);
537 AtomicInteger c = new AtomicInteger();
538 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
539 firehose(c).takeWhile(new Func1<Integer, Boolean>() {
540
541 @Override
542 public Boolean call(Integer t1) {
543 return t1 < 100000;
544 }
545
546 }).onBackpressureBuffer().observeOn(Schedulers.computation()).map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
547 ts.awaitTerminalEvent();
548 ts.assertNoErrors();
549 System.out.println("testOnBackpressureBuffer => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c.get());
550 assertEquals(NUM, ts.getOnNextEvents().size());
551
552 assertEquals(NUM - 1, ts.getOnNextEvents().get(NUM - 1).intValue());
553 }
554
555
556
557
558
559
560
561 private static Observable<Integer> incrementingIntegers(final AtomicInteger counter) {
562 return incrementingIntegers(counter, null);
563 }
564
565 private static Observable<Integer> incrementingIntegers(final AtomicInteger counter, final ConcurrentLinkedQueue<Thread> threadsSeen) {
566 return Observable.create(new OnSubscribe<Integer>() {
567
568 final AtomicLong requested = new AtomicLong();
569
570 @Override
571 public void call(final Subscriber<? super Integer> s) {
572 s.setProducer(new Producer() {
573 int i = 0;
574
575 @Override
576 public void request(long n) {
577 if (n == 0) {
578
579 return;
580 }
581 if (threadsSeen != null) {
582 threadsSeen.offer(Thread.currentThread());
583 }
584 long _c = requested.getAndAdd(n);
585 if (_c == 0) {
586 while (!s.isUnsubscribed()) {
587 counter.incrementAndGet();
588 s.onNext(i++);
589 if (requested.decrementAndGet() == 0) {
590
591 return;
592 }
593 }
594 }
595 }
596
597 });
598 }
599
600 });
601 }
602
603
604
605
606
607
608
609 private static Observable<Integer> firehose(final AtomicInteger counter) {
610 return Observable.create(new OnSubscribe<Integer>() {
611
612 int i = 0;
613
614 @Override
615 public void call(final Subscriber<? super Integer> s) {
616 while (!s.isUnsubscribed()) {
617 s.onNext(i++);
618 counter.incrementAndGet();
619 }
620 System.out.println("unsubscribed after: " + i);
621 }
622
623 });
624 }
625
626 final static Func1<Integer, Integer> SLOW_PASS_THRU = new Func1<Integer, Integer>() {
627 volatile int sink;
628 @Override
629 public Integer call(Integer t1) {
630
631 String t = "";
632 int s = sink;
633 for (int i = 1000; i >= 0; i--) {
634 t = String.valueOf(i + t.hashCode() + s);
635 }
636 sink = t.hashCode();
637 return t1;
638 }
639
640 };
641 }